compio_runtime\fd\async_fd/
mod.rs1use std::{io, ops::Deref};
2
3use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut};
4use compio_driver::{
5 AsFd, AsRawFd, BorrowedFd, RawFd, SharedFd, ToSharedFd,
6 op::{BufResultExt, Read, ReadManaged, ResultTakeBuffer, Write},
7};
8use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite, util::Splittable};
9#[cfg(unix)]
10use {
11 compio_buf::{IoVectoredBuf, IoVectoredBufMut},
12 compio_driver::op::{ReadVectored, WriteVectored},
13};
14
15use crate::{Attacher, BorrowedBuffer, BufferPool};
16
17#[cfg(windows)]
18mod windows;
19
20#[cfg(unix)]
21mod unix;
22
23#[derive(Debug)]
25pub struct AsyncFd<T: AsFd> {
26 inner: Attacher<T>,
27}
28
29impl<T: AsFd> AsyncFd<T> {
30 pub fn new(source: T) -> io::Result<Self> {
32 Ok(Self {
33 inner: Attacher::new(source)?,
34 })
35 }
36
37 pub unsafe fn new_unchecked(source: T) -> Self {
44 Self {
45 inner: unsafe { Attacher::new_unchecked(source) },
46 }
47 }
48}
49
50impl<T: AsFd + 'static> AsyncRead for AsyncFd<T> {
51 #[inline]
52 async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
53 (&*self).read(buf).await
54 }
55
56 #[cfg(unix)]
57 #[inline]
58 async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
59 (&*self).read_vectored(buf).await
60 }
61}
62
63impl<T: AsFd + 'static> AsyncReadManaged for AsyncFd<T> {
64 type Buffer<'a> = BorrowedBuffer<'a>;
65 type BufferPool = BufferPool;
66
67 async fn read_managed<'a>(
68 &mut self,
69 buffer_pool: &'a Self::BufferPool,
70 len: usize,
71 ) -> io::Result<Self::Buffer<'a>> {
72 (&*self).read_managed(buffer_pool, len).await
73 }
74}
75
76impl<T: AsFd + 'static> AsyncReadManaged for &AsyncFd<T> {
77 type Buffer<'a> = BorrowedBuffer<'a>;
78 type BufferPool = BufferPool;
79
80 async fn read_managed<'a>(
81 &mut self,
82 buffer_pool: &'a Self::BufferPool,
83 len: usize,
84 ) -> io::Result<Self::Buffer<'a>> {
85 let fd = self.to_shared_fd();
86 let buffer_pool = buffer_pool.try_inner()?;
87 let op = ReadManaged::new(fd, buffer_pool, len)?;
88 crate::submit(op)
89 .with_extra()
90 .await
91 .take_buffer(buffer_pool)
92 }
93}
94
95impl<T: AsFd + 'static> AsyncRead for &AsyncFd<T> {
96 async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
97 let fd = self.inner.to_shared_fd();
98 let op = Read::new(fd, buf);
99 let res = crate::submit(op).await.into_inner();
100 unsafe { res.map_advanced() }
101 }
102
103 #[cfg(unix)]
104 async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
105 use compio_driver::op::VecBufResultExt;
106
107 let fd = self.inner.to_shared_fd();
108 let op = ReadVectored::new(fd, buf);
109 let res = crate::submit(op).await.into_inner();
110 unsafe { res.map_vec_advanced() }
111 }
112}
113
114impl<T: AsFd + 'static> AsyncWrite for AsyncFd<T> {
115 #[inline]
116 async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
117 (&*self).write(buf).await
118 }
119
120 #[cfg(unix)]
121 #[inline]
122 async fn write_vectored<V: IoVectoredBuf>(&mut self, buf: V) -> BufResult<usize, V> {
123 (&*self).write_vectored(buf).await
124 }
125
126 #[inline]
127 async fn flush(&mut self) -> io::Result<()> {
128 (&*self).flush().await
129 }
130
131 #[inline]
132 async fn shutdown(&mut self) -> io::Result<()> {
133 (&*self).shutdown().await
134 }
135}
136
137impl<T: AsFd + 'static> AsyncWrite for &AsyncFd<T> {
138 async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
139 let fd = self.inner.to_shared_fd();
140 let op = Write::new(fd, buf);
141 crate::submit(op).await.into_inner()
142 }
143
144 #[cfg(unix)]
145 async fn write_vectored<V: IoVectoredBuf>(&mut self, buf: V) -> BufResult<usize, V> {
146 let fd = self.inner.to_shared_fd();
147 let op = WriteVectored::new(fd, buf);
148 crate::submit(op).await.into_inner()
149 }
150
151 async fn flush(&mut self) -> io::Result<()> {
152 Ok(())
153 }
154
155 async fn shutdown(&mut self) -> io::Result<()> {
156 Ok(())
157 }
158}
159
160impl<T: AsFd> IntoInner for AsyncFd<T> {
161 type Inner = SharedFd<T>;
162
163 fn into_inner(self) -> Self::Inner {
164 self.inner.into_inner()
165 }
166}
167
168impl<T: AsFd> AsFd for AsyncFd<T> {
169 fn as_fd(&self) -> BorrowedFd<'_> {
170 self.inner.as_fd()
171 }
172}
173
174impl<T: AsFd> AsRawFd for AsyncFd<T> {
175 fn as_raw_fd(&self) -> RawFd {
176 self.inner.as_fd().as_raw_fd()
177 }
178}
179
180impl<T: AsFd> ToSharedFd<T> for AsyncFd<T> {
181 fn to_shared_fd(&self) -> SharedFd<T> {
182 self.inner.to_shared_fd()
183 }
184}
185
186impl<T: AsFd> Clone for AsyncFd<T> {
187 fn clone(&self) -> Self {
188 Self {
189 inner: self.inner.clone(),
190 }
191 }
192}
193
194impl<T: AsFd> Deref for AsyncFd<T> {
195 type Target = T;
196
197 fn deref(&self) -> &Self::Target {
198 &self.inner
199 }
200}
201
202impl<T: AsFd> Splittable for AsyncFd<T> {
203 type ReadHalf = Self;
204 type WriteHalf = Self;
205
206 fn split(self) -> (Self::ReadHalf, Self::WriteHalf) {
207 (self.clone(), self)
208 }
209}